package cn.spark.study.core

import org.apache.spark.{SparkConf, SparkContext}
import scala.util.control.Breaks

object GroupTop3 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("GroupTop3").setMaster("local")
    val sc = new SparkContext(conf)
    val path = "E:\\BaiduNetdiskDownload\\Spark从入门到精通（Scala编程、案例实战、高级特性、Spark内核源码剖析、Hadoop高端）\\第40讲-Spark核心编程：高级编程之topn\\文档\\score.txt";
    val lines = sc.textFile(path)
    val pairs = lines.map(line => {
      val lineSplited = line.split(" ")
      (lineSplited(0), lineSplited(1).toInt)
    })
    val groupClassScores = pairs.groupByKey()

    val top3ClassScores = groupClassScores.map(classScore => {
      val className = classScore._1
      val scores = classScore._2
      val top3 = new Array[Int](3)
      scores.foreach(score => {
        Breaks.breakable {
          for(i <- 0 until 3) {
            if(top3(i) == 0) {
              top3(i) = score
              Breaks.break
            } else if(score > top3(i)) {
              for(j <- (i+1 to 2).reverse) {
                top3(j) = top3(j - 1)
              }
              top3(i) = score
              Breaks.break
            }
          }
        }
      })
      (className, top3)
    })

//    top3ClassScores.foreach(classScore => {
//      println("class : " + classScore._1)
//      classScore._2.foreach(score=>println(score))
//      println("=================================")
//    })

    for(classScore <- top3ClassScores) {
      println("class : " + classScore._1)
      classScore._2.foreach(score=>println(score))
      println("=================================")
    }
  }
}
